package com.my.service.task;

import com.my.service.task.entity.BaijiaInfo;
import com.my.service.task.map.BaijiaMap;
import com.my.service.task.reduce.BaijiaReducer;
import com.my.service.task.util.DateUtils;
import com.my.service.task.util.HbaseUtils;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.utils.ParameterTool;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;

public class BaijiaTask {
    public static void main(String[] args) throws Exception {
        final ParameterTool params = ParameterTool.fromArgs(args);
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(params);
        DataSet<String> text = env.readTextFile(params.get("input"));
        DataSet<BaijiaInfo> mapResult = text.map(new BaijiaMap());
        DataSet<BaijiaInfo> reduceResult = mapResult.groupBy("groupfield").reduce(new BaijiaReducer());
        List<BaijiaInfo> resultList = reduceResult.collect();
        for (BaijiaInfo bInfo : resultList) {
            String userid = bInfo.getUserid();
            List<BaijiaInfo> list = bInfo.getList();
            Collections.sort(list, (o1, o2) -> {
                String time1 = o1.getCreatetime();
                String time2 = o2.getCreatetime();
                SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
                int res = 0;
                try {
                    res = dateFormat.parse(time1).compareTo(dateFormat.parse(time2));
                } catch (ParseException e) {
                    e.printStackTrace();
                }
                return res;
            });
            // 计算当前userid的下单间隔(相当于频率的倒数)
            // hashmap记录的是 隔多少天 的下单次数，比如3天下单多少次，4天下单多少次
            HashMap<Integer, Integer> frequencyMap = new HashMap<>();
            // 计算当前userid的最大下单金额
            double maxAmount = 0d;
            // 计算当前userid的下单金额总额
            double sumAmount = 0d;
            BaijiaInfo prev = null;
            for (BaijiaInfo bInfoInner : list) {
                if (prev == null) {
                    prev = bInfoInner;
                    continue;
                }
                // 计算频率
                String beforetime = prev.getCreatetime();
                String endtime = bInfoInner.getCreatetime();
                int days = DateUtils.getDaysBetweenbyStartAndend(beforetime, endtime, "yyyyMMddHHmmss");
                frequencyMap.put(days, frequencyMap.getOrDefault(days, 0) + 1);

                // 计算最大金额
                Double totalAmount = Double.valueOf(bInfoInner.getTotalamount());
                if (totalAmount > maxAmount) {
                    maxAmount = totalAmount;
                }

                // 计算总金额
                sumAmount += totalAmount;
                prev = bInfoInner;
            }
            // 计算平均金额
            double avgAmount = sumAmount / list.size();

            // 计算下单频率
            int days = 0;
            for (Map.Entry<Integer, Integer> tmp : frequencyMap.entrySet()) {
                days += tmp.getKey() * tmp.getValue();
            }
            double avgDays = (double) days / list.size();


            // 根据平均金额 计算平均金额得分
            int avgmountScore = 0;
            if (avgAmount >= 0 && avgAmount < 20) {
                avgmountScore = 5;
            } else if (avgAmount >= 20 && avgAmount < 60) {
                avgmountScore = 10;
            } else if (avgAmount >= 60 && avgAmount < 100) {
                avgmountScore = 20;
            } else if (avgAmount >= 100 && avgAmount < 150) {
                avgmountScore = 30;
            } else if (avgAmount >= 150 && avgAmount < 200) {
                avgmountScore = 40;
            } else if (avgAmount >= 200 && avgAmount < 250) {
                avgmountScore = 60;
            } else if (avgAmount >= 250 && avgAmount < 350) {
                avgmountScore = 70;
            } else if (avgAmount >= 350 && avgAmount < 450) {
                avgmountScore = 80;
            } else if (avgAmount >= 450 && avgAmount < 600) {
                avgmountScore = 90;
            } else if (avgAmount >= 600) {
                avgmountScore = 100;
            }

            // 根据最大金额 计算最大金额得分
            int maxAmountScore = 0;
            if (maxAmount >= 0 && maxAmount < 20) {
                maxAmountScore = 5;
            } else if (maxAmount >= 20 && maxAmount < 60) {
                maxAmountScore = 10;
            } else if (maxAmount >= 60 && maxAmount < 200) {
                maxAmountScore = 30;
            } else if (maxAmount >= 200 && maxAmount < 500) {
                maxAmountScore = 60;
            } else if (maxAmount >= 500 && maxAmount < 700) {
                maxAmountScore = 80;
            } else if (maxAmount >= 700) {
                maxAmountScore = 100;
            }

            // 根据下单频率 计算下单频率得分
            int avgDaysScore = 0;
            if (avgDays >= 0 && avgDays < 5) {
                avgDaysScore = 100;
            } else if (avgAmount >= 5 && avgAmount < 10) {
                avgDaysScore = 90;
            } else if (avgAmount >= 10 && avgAmount < 30) {
                avgDaysScore = 70;
            } else if (avgAmount >= 30 && avgAmount < 60) {
                avgDaysScore = 60;
            } else if (avgAmount >= 60 && avgAmount < 80) {
                avgDaysScore = 40;
            } else if (avgAmount >= 80 && avgAmount < 100) {
                avgDaysScore = 20;
            } else if (avgAmount >= 100) {
                avgDaysScore = 10;
            }


            // 计算败家指数
            double totalscore = (avgmountScore / 100) * 30 + (maxAmountScore / 100) * 30 + (avgDaysScore / 100) * 40;

            String tablename = "userflaginfo";
            String rowkey = userid;
            String famliyname = "baseinfo";
            String colum = "baijiascore";
            HbaseUtils.putdata(tablename, rowkey, famliyname, colum, totalscore + "");
        }
        env.execute("carrier analysis");
    }
}
